Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

(Re)subscribe to a channel with correct emitter #549

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

mladenmarkov
Copy link

When (re)subscribing to a channel and there's already a Subscription object created, it's being reused. However, it contains the old emitter, from a previous subscription attempt.

This pull request suggests to always create a new Subscription object, to ensure future notifications are sent to the new emitter. Otherwise, the new subscription will not receive any events whatsoever.

This change allows subscriptions to be retried automatically using the retryWhen() operator.

…bject, to ensure future notifications are sent to the new emitter.
@mdvx
Copy link
Contributor

mdvx commented Mar 10, 2020

please check out
#542
it deals with overlapping issues

@mdvx
Copy link
Contributor

mdvx commented Mar 10, 2020

would the doOnDispose() at line 378 (which removes the subscription from the map) not achieve the same result?
How would retryWhen() be used?

@mladenmarkov
Copy link
Author

please check out
#542
it deals with overlapping issues

Not really overlapping issues. That pull request is for auto-reconnect, which is great. However, it does not deal with the issue that (re-)subscriptions can fail for any reason (broken connection, rate limit, etc) and then subscriber would not be notified.

Imagine subscribing for order books of 100 pairs and a rate-limited exchange (yes, Kraken, but also others). The first 20 or so subscriptions would be successful, but the rest would not. The connection is OK and there will be no auto-reconnect happening, to call the resubscribeChannels() method. The consumer gets the failures in their respective onError() handlers and retry, but without disposing the Disposable. It cannot be assumed that the disposable will be disposed before retrying, besides retryWhen doesn't seem to do that either.

Now, since the channels map already contains a Subscription object, the second call to the subscribeChannel() method (the retry) will not update the map and subsequent events will not be sent to the new consumer. That's why I think the map should be updated on each call of the subscribeChannel() method.

For how retryWhen() would be used, you can see #550. I've included a test which automatically retries subscriptions with an exp. backoff strategy.

@makarid
Copy link
Contributor

makarid commented Mar 10, 2020

This is a must feature in my opinion. Thank you @mladenmarkov for this PR.

@mdvx
Copy link
Contributor

mdvx commented Mar 10, 2020

I was referring to (which is also in PR#542 )

   * Some exchanges rate limit messages sent to the socket (Kraken), by default this method does not rateLimit the
   * messages sent out.
   * Override this method to provide a rateLimiter, and call acquire on the rate limiter,
   * to slow down out going messages.
   */
  protected void sendMessageRateLimiterAcquire() ```

i saw the #550 back off strategy, impressive.

@badgerwithagun badgerwithagun added the review_overdue Sorry the review of this is overdue. We'll get around to it soon! label Mar 27, 2020
@badgerwithagun
Copy link
Collaborator

#566 Also covers a related discussion

@badgerwithagun
Copy link
Collaborator

This project is in the process of being merged into the XChange project and no further PRs will be merged here. Once the projects have been merged, there may be a short stabilization period where there will be large-scale renaming of classes and packages, which may cause conflicts. You are advised to wait at least a week from now and then resubmit your PR on the XChange project. Thank you for your support!

@badgerwithagun
Copy link
Collaborator

You can now resubmit your PR on XChange. This project will shortly be marked as archived.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
review_overdue Sorry the review of this is overdue. We'll get around to it soon!
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants